// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//


#pragma once

#include <atomic>
#include <chrono>
#include <limits>
#include <list>
#include <memory>
#include <string>

#include <fmt/format.h>
#include <melon/synchronized.h>
#include <turbo/flags/flag.h>
#include <turbo/flags/declare.h>
#include <turbo/log/logging.h>

#include <pollux/common/base/exceptions.h>
#include <melon/cportability.h>
#include <turbo/log/logging.h>
#include "melon/likely.h"
#include <melon/random.h>
#include "melon/shared_mutex.h"
#include <pollux/common/base/checked_arithmetic.h>
#include <pollux/common/base/succinct_printer.h>
#include <pollux/common/memory/allocation.h>
#include <pollux/common/memory/memory_allocator.h>
#include <pollux/common/memory/memory_pool.h>
#include <pollux/flag_definitions/flags.h>

namespace kumo::pollux::memory {
#define POLLUX_MEM_LOG_PREFIX "[MEM] "
#define POLLUX_MEM_LOG(severity) KLOG(severity) << POLLUX_MEM_LOG_PREFIX
#define POLLUX_MEM_LOG_EVERY_SEC(severity) \
  KLOG_EVERY_SEC(severity) << POLLUX_MEM_LOG_PREFIX

#define POLLUX_MEM_ALLOC_ERROR(errorMessage)                         \
  _POLLUX_THROW(                                                     \
      ::kumo::pollux::PolluxRuntimeError,                         \
      ::kumo::pollux::error_source::kErrorSourceRuntime.c_str(), \
      ::kumo::pollux::error_code::kMemAllocError.c_str(),        \
      /* isRetriable */ true,                                       \
      "{}",                                                         \
      errorMessage);

    struct MemoryManagerOptions {
        /// Specifies the default memory allocation alignment.
        uint16_t alignment{MemoryAllocator::kMaxAlignment};

        /// If true, enable memory usage tracking in the default memory pool.
        bool trackDefaultUsage{
            turbo::get_flag(FLAGS_pollux_enable_memory_usage_track_in_default_memory_pool)
        };

        /// If true, check the memory pool and usage leaks on destruction.
        ///
        /// TODO: deprecate this flag after all the existing memory leak use cases
        /// have been fixed.
        bool checkUsageLeak{turbo::get_flag(FLAGS_pollux_memory_leak_check_enabled)};

        /// Terminates the process and generates a core file on an allocation failure
        bool coreOnAllocationFailureEnabled{false};

        /// Disables the memory manager's tracking on memory pools.
        bool disableMemoryPoolTracking{false};

        /// ================== 'MemoryAllocator' settings ==================

        /// Specifies the max memory allocation capacity in bytes enforced by
        /// MemoryAllocator, default unlimited.
        int64_t allocatorCapacity{kMaxMemory};

        /// If true, uses MmapAllocator for memory allocation which manages the
        /// physical memory allocation on its own through std::mmap techniques. If
        /// false, use MallocAllocator which delegates the memory allocation to
        /// std::malloc.
        bool useMmapAllocator{false};

        /// Number of pages in the largest size class in MmapAllocator.
        int32_t largestSizeClassPages{256};

        /// If true, allocations larger than largest size class size will be delegated
        /// to ManagedMmapArena. Otherwise a system mmap call will be issued for each
        /// such allocation.
        ///
        /// NOTE: this only applies for MmapAllocator.
        bool useMmapArena{false};

        /// Used to determine MmapArena capacity. The ratio represents
        /// 'allocatorCapacity' to single MmapArena capacity ratio.
        ///
        /// NOTE: this only applies for MmapAllocator.
        int32_t mmapArenaCapacityRatio{10};

        /// If not zero, reserve 'smallAllocationReservePct'% of space from
        /// 'allocatorCapacity' for ad hoc small allocations. And those allocations
        /// are delegated to std::malloc. If 'maxMallocBytes' is 0, this value will be
        /// disregarded.
        ///
        /// NOTE: this only applies for MmapAllocator.
        uint32_t smallAllocationReservePct{0};

        /// The allocation threshold less than which an allocation is delegated to
        /// std::malloc(). If it is zero, then we don't delegate any allocation
        /// std::malloc, and 'smallAllocationReservePct' will be automatically set to
        /// 0 disregarding any passed in value.
        ///
        /// NOTE: this only applies for MmapAllocator.
        int32_t maxMallocBytes{3072};

        /// The memory allocations with size smaller than this threshold check the
        /// capacity with local sharded counter to reduce the lock contention on the
        /// global allocation counter. The sharded local counters reserve/release
        /// memory capacity from the global counter in batch. With this optimization,
        /// we don't have to update the global counter for each individual small
        /// memory allocation. If it is zero, then this optimization is disabled. The
        /// default is 1MB.
        ///
        /// NOTE: this only applies for MallocAllocator.
        uint32_t allocationSizeThresholdWithReservation{1 << 20};

        /// ================== 'MemoryArbitrator' settings =================

        /// Memory capacity available for query/task memory pools. This capacity
        /// setting should be equal or smaller than 'allocatorCapacity'. The
        /// difference between 'allocatorCapacity' and 'arbitratorCapacity' is
        /// reserved for system usage such as cache and spilling.
        ///
        /// NOTE:
        /// - if 'arbitratorCapacity' is greater than 'allocatorCapacity', the
        /// behavior will be equivalent to as if they are equal, meaning no
        /// reservation capacity for system usage.
        int64_t arbitratorCapacity{kMaxMemory};

        /// The string kind of memory arbitrator used in the memory manager.
        ///
        /// NOTE: the arbitrator will only be created if its kind is set explicitly.
        /// Otherwise MemoryArbitrator::create returns a nullptr.
        std::string arbitratorKind{};

        /// Provided by the query system to validate the state after a memory pool
        /// enters arbitration if not null. For instance, Prestissimo provides
        /// callback to check if a memory arbitration request is issued from a driver
        /// thread, then the driver should be put in suspended state to avoid the
        /// potential deadlock when reclaim memory from the task of the request memory
        /// pool.
        MemoryArbitrationStateCheckCB arbitrationStateCheckCb{nullptr};

        /// Additional configs that are arbitrator implementation specific.
        melon::F14FastMap<std::string, std::string> extraArbitratorConfigs{};

        /// Provides the customized get preferred size function for memory pool
        /// allocation. It returns the actual allocation size for a given input size.
        /// If not set, uses the memory pool's default get preferred size function.
        std::function<size_t(size_t)> getPreferredSize{nullptr};
    };

    /// 'MemoryManager' is responsible for creating allocator, arbitrator and
    /// managing the memory pools.
    class MemoryManager {
    public:
        explicit MemoryManager(
            const MemoryManagerOptions &options = MemoryManagerOptions{});

        ~MemoryManager();

        /// Creates process-wide memory manager using specified options. Throws if
        /// memory manager has already been created by an easier call.
        static void initialize(const MemoryManagerOptions &options);

        /// Returns process-wide memory manager. Throws if 'initialize' hasn't been
        /// called yet.
        static MemoryManager *getInstance();

        /// Deprecated. Do not use. Remove once existing call sites are updated.
        /// Returns the process-wide default memory manager instance if exists,
        /// otherwise creates one based on the specified 'options'.
        MELON_EXPORT static MemoryManager &deprecatedGetInstance(
            const MemoryManagerOptions &options = MemoryManagerOptions{});

        /// Returns true if the memory manager has been set.
        static bool testInstance();

        /// Used by test to override the process-wide memory manager.
        static MemoryManager &testingSetInstance(const MemoryManagerOptions &options);

        /// Returns the memory capacity of this memory manager which puts a hard cap
        /// on memory usage, and any allocation that exceeds this capacity throws.
        int64_t capacity() const;

        /// Returns the memory allocation alignment of this memory manager.
        uint16_t alignment() const;

        /// Creates a root memory pool with specified 'name' and 'maxCapacity'. If
        /// 'name' is missing, the memory manager generates a default name internally
        /// to ensure uniqueness.
        std::shared_ptr<MemoryPool> addRootPool(
            const std::string &name = "",
            int64_t maxCapacity = kMaxMemory,
            std::unique_ptr<MemoryReclaimer> reclaimer = nullptr,
            const std::optional<MemoryPool::DebugOptions> &poolDebugOpts =
                    std::nullopt);

        /// Creates a leaf memory pool for direct memory allocation use with specified
        /// 'name'. If 'name' is missing, the memory manager generates a default name
        /// internally to ensure uniqueness. The leaf memory pool is created as the
        /// child of the memory manager's default root memory pool. If 'threadSafe' is
        /// true, then we track its memory usage in a non-thread-safe mode to reduce
        /// its cpu cost.
        std::shared_ptr<MemoryPool> addLeafPool(
            const std::string &name = "",
            bool threadSafe = true);

        /// Invoked to shrink alive pools to free 'targetBytes' capacity. The function
        /// returns the actual freed memory capacity in bytes. If 'targetBytes' is
        /// zero, then try to reclaim all the memory from the alive pools. If
        /// 'allowSpill' is true, it reclaims the used memory by spilling. If
        /// 'allowAbort' is true, it reclaims the used memory by aborting the queries
        /// with the most memory usage. If both are true, it first reclaims the used
        /// memory by spilling and then abort queries to reach the reclaim target.
        uint64_t shrinkPools(
            uint64_t targetBytes = 0,
            bool allowSpill = true,
            bool allowAbort = false);

        /// Default unmanaged leaf pool with no threadsafe stats support. Libraries
        /// using this method can get a pool that is shared with other threads. The
        /// goal is to minimize lock contention while supporting such use cases.
        ///
        /// TODO: deprecate this API after all the use cases are able to manage the
        /// lifecycle of the allocated memory pools properly.
        MemoryPool &deprecatedSharedLeafPool();

        /// Returns the current total memory usage under this memory manager.
        int64_t getTotalBytes() const;

        /// Returns the number of alive memory pools allocated from addRootPool() and
        /// addLeafPool().
        ///
        /// NOTE: this doesn't count the memory manager's internal default root and
        /// leaf memory pools.
        size_t numPools() const;

        MemoryAllocator *allocator();

        MemoryArbitrator *arbitrator();

        /// Returns debug string of this memory manager. If 'detail' is true, it
        /// returns the detailed tree memory usage from all the top level root memory
        /// pools.
        std::string toString(bool detail = false) const;

        /// Returns the memory manger's internal default root memory pool for testing
        /// purpose.
        MemoryPool &testingDefaultRoot() const {
            return *sysRoot_;
        }

        /// Returns the process wide leaf memory pool used for disk spilling.
        MemoryPool *spillPool() {
            return spillPool_.get();
        }

        /// Returns the process wide leaf memory pool used for ssd cache.
        MemoryPool *cachePool() {
            return cachePool_.get();
        }

        /// Returns the process wide leaf memory pool used for query tracing.
        MemoryPool *tracePool() const {
            return tracePool_.get();
        }

        const std::vector<std::shared_ptr<MemoryPool> > &testingSharedLeafPools() {
            return sharedLeafPools_;
        }

    private:
        std::shared_ptr<MemoryPoolImpl> createRootPool(
            std::string poolName,
            std::unique_ptr<MemoryReclaimer> &reclaimer,
            MemoryPool::Options &options);

        void dropPool(MemoryPool *pool);

        //  Returns the shared references to all the alive memory pools in 'pools_'.
        std::vector<std::shared_ptr<MemoryPool> > getAlivePools() const;

        const std::shared_ptr<MemoryAllocator> allocator_;

        // If not null, used to arbitrate the memory capacity among 'pools_'.
        const std::unique_ptr<MemoryArbitrator> arbitrator_;
        const uint16_t alignment_;
        const bool checkUsageLeak_;
        const bool coreOnAllocationFailureEnabled_;
        const bool disableMemoryPoolTracking_;
        const std::function<size_t(size_t)> getPreferredSize_;

        // The destruction callback set for the allocated root memory pools which are
        // tracked by 'pools_'. It is invoked on the root pool destruction and removes
        // the pool from 'pools_'.
        const MemoryPoolImpl::DestructionCallback poolDestructionCb_;

        const std::shared_ptr<MemoryPool> sysRoot_;
        const std::shared_ptr<MemoryPool> spillPool_;
        const std::shared_ptr<MemoryPool> cachePool_;
        const std::shared_ptr<MemoryPool> tracePool_;
        const std::vector<std::shared_ptr<MemoryPool> > sharedLeafPools_;

        mutable melon::SharedMutex mutex_;
        // All user root pools allocated from 'this'.
        melon::F14FastMap<std::string, std::weak_ptr<MemoryPool> > pools_;
    };

    /// Initializes the process-wide memory manager based on the specified
    /// 'options'.
    ///
    /// NOTE: user should only call this once on query system startup. Otherwise,
    /// the function throws.
    void initializeMemoryManager(const MemoryManagerOptions &options);

    /// Returns the process-wide memory manager.
    ///
    /// NOTE: user should have already initialized memory manager by calling.
    /// Otherwise, the function throws.
    MemoryManager *memoryManager();

    /// Deprecated. Do not use.
    MemoryManager &deprecatedDefaultMemoryManager();

    /// Deprecated. Do not use.
    /// Creates a leaf memory pool from the default memory manager for memory
    /// allocation use. If 'threadSafe' is true, then creates a leaf memory pool
    /// with thread-safe memory usage tracking.
    std::shared_ptr<MemoryPool> deprecatedAddDefaultLeafMemoryPool(
        const std::string &name = "",
        bool threadSafe = true);

    /// Default unmanaged leaf pool with no threadsafe stats support. Libraries
    /// using this method can get a pool that is shared with other threads. The goal
    /// is to minimize lock contention while supporting such use cases.
    ///
    /// TODO: deprecate this API after all the use cases are able to manage the
    /// lifecycle of the allocated memory pools properly.
    MemoryPool &deprecatedSharedLeafPool();

    /// Returns the system-wide memory pool for spilling memory usage.
    memory::MemoryPool *spillMemoryPool();

    /// Returns true if the provided 'pool' is the spilling memory pool.
    bool isSpillMemoryPool(memory::MemoryPool *pool);

    /// Returns the system-wide memory pool for tracing memory usage.
    memory::MemoryPool *traceMemoryPool();

    MELON_ALWAYS_INLINE int32_t alignmentPadding(void *address, int32_t alignment) {
        auto extra = reinterpret_cast<uintptr_t>(address) % alignment;
        return extra == 0 ? 0 : alignment - extra;
    }
} // namespace kumo::pollux::memory
